package com.ruyuan.eshop.push.mq.consumer.listener;

import com.alibaba.fastjson.JSON;
import com.ruyuan.eshop.common.concurrent.SafeThreadPool;
import com.ruyuan.eshop.common.constants.RocketMqConstant;
import com.ruyuan.eshop.common.core.JsonResult;
import com.ruyuan.eshop.common.message.PlatformHotProductMessage;
import com.ruyuan.eshop.common.message.PlatformHotProductUserBucketMessage;
import com.ruyuan.eshop.common.utils.JsonUtil;
import com.ruyuan.eshop.persona.api.PersonaApi;
import com.ruyuan.eshop.persona.page.PersonaConditionWithIdRange;
import com.ruyuan.eshop.push.domain.dto.PersonaFilterConditionDTO;
import com.ruyuan.eshop.push.domain.vo.HotGoodsVO;
import com.ruyuan.eshop.push.mq.producer.DefaultProducer;
import com.ruyuan.eshop.push.splitter.ListSplitter;
import lombok.extern.slf4j.Slf4j;
import org.apache.dubbo.config.annotation.DubboReference;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;

import static com.ruyuan.eshop.common.constants.BatchSizeConstant.MESSAGE_BATCH_SIZE;

/**
 * @author zhonghuashishan
 */
@Slf4j
@Component
public class PlatFormHotProductUserBucketListener implements MessageListenerConcurrently {

    @DubboReference(version = "1.0.0")
    private PersonaApi personaApi;

    @Autowired
    private DefaultProducer producer;

    /**
     * 公用的消息推送线程池
     */
    @Autowired
    @Qualifier("sharedSendMsgThreadPool")
    private SafeThreadPool sharedSendMsgThreadPool;
    /**
     * 并发消费消息
     * @param msgList
     * @param context
     * @return
     */
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext context) {
        try {
            for (MessageExt messageExt : msgList) {
                String msg = new String(messageExt.getBody());
                PlatformHotProductUserBucketMessage hotProductMessagePushTask = JSON.parseObject(msg , PlatformHotProductUserBucketMessage.class);
                log.info("执行热门商品推送用户桶消息逻辑，消息内容：{}", hotProductMessagePushTask);

                // 1.获取本次热门商品推送任务分片中的数据
                String hotGoodString = hotProductMessagePushTask.getHotGoodsVO();
                String personaFilterCondition = hotProductMessagePushTask.getPersonaFilterConditionDTO();
                HotGoodsVO hotGoodsVO = JSON.parseObject(hotGoodString, HotGoodsVO.class);
                log.info("选人条件，内容：{}", personaFilterCondition);
                if(Objects.isNull(personaFilterCondition) || Objects.isNull(hotGoodsVO)){
                    continue;
                }
                PersonaFilterConditionDTO conditionDTO = JSON.parseObject(personaFilterCondition,
                        PersonaFilterConditionDTO.class);
                Long startUserId = hotProductMessagePushTask.getStartUserId();
                Long endUserId = hotProductMessagePushTask.getEndUserId();

                // 分页查询条件
                PersonaConditionWithIdRange page = PersonaConditionWithIdRange.builder()
                        .memberLevel(conditionDTO.getMemberLevel())
                        .memberPoint(conditionDTO.getMemberPoint())
                        .startId(startUserId)
                        .endId(endUserId)
                        .build();

                // 2、查询本次推送任务分片对应的用户群体
                // 注意：查询的时候，传入查询条件过滤掉不符合条件的用户id

                JsonResult<List<Long>> queryAccountIdsResult = personaApi.getAccountIdsByIdRange(page);
                List<Long> accountIds = queryAccountIdsResult.getData();

                PlatformHotProductMessage hotMessage = PlatformHotProductMessage.builder()
                        .goodsName(hotGoodsVO.getGoodsName())
                        .goodsDesc(hotGoodsVO.getGoodsDesc())
                        .keyWords(hotGoodsVO.getKeyWords())
                        .build();
                int handledBucketCount = 0;
                List<String> messages = new ArrayList<>();
                for (Long accountId : accountIds) {
                    handledBucketCount++;
                    hotMessage.setAccountId(accountId);
                    log.info("构造热门商品MQ消息, hotMessage: {}", hotMessage);
                    messages.add(JSON.toJSONString(hotMessage));
                }
                ListSplitter splitter = new ListSplitter(messages, MESSAGE_BATCH_SIZE);
                while(splitter.hasNext()){
                    List<String> sendBatch = splitter.next();
                    log.info("本次批次消息数量,{}",sendBatch.size());
                    sharedSendMsgThreadPool.execute(() -> {
                        producer.sendMessages(RocketMqConstant.PLATFORM_HOT_PRODUCT_SEND_TOPIC,
                                sendBatch, "平台热门商品定时任务用户桶消息");
                    });
                }
            }
        }catch (Exception e){
            log.error("consume error,热门商品通知消费失败", e);
            // 这边因为是推送任务，个别失败也可以直接丢弃
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }

        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

    private String buildMessage(HotGoodsVO hotGood, Long accountId) {
        PlatformHotProductMessage message = PlatformHotProductMessage.builder()
                .goodsName(hotGood.getGoodsName())
                .goodsDesc(hotGood.getGoodsDesc())
                .keyWords(hotGood.getKeyWords())
                .accountId(accountId)
                .build();

        return JsonUtil.object2Json(message);
    }
    /**
     * 第三方平台推送消息到app
     *
     * @param message
     */
    private void informByPush(HashMap message){
        String messageBody = "速戳!精致小物件，"
                + message.get("keywords")+"！"
                + message.get("goodsName")
                + message.get("goodsDesc");
        log.info("消息推送中：消息内容：{}", messageBody);
    }


}
